// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#ifndef IMPALA_EXEC_TEXT_CONVERTER_INLINE_H
#define IMPALA_EXEC_TEXT_CONVERTER_INLINE_H

#include "text-converter.h"

#include <boost/algorithm/string.hpp>

#include "runtime/runtime-state.h"
#include "runtime/descriptors.h"
#include "runtime/tuple.h"
#include "util/coding-util.h"
#include "util/string-parser.h"
#include "runtime/string-value.h"
#include "runtime/date-value.h"
#include "runtime/timestamp-value.h"
#include "runtime/mem-pool.h"
#include "runtime/string-value.inline.h"
#include "exprs/string-functions.h"

namespace impala {

/// Note: this function has a codegen'd version.  Changing this function requires
/// corresponding changes to CodegenWriteSlot().
inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tuple,
    const char* data, int len, bool copy_string, bool need_escape, MemPool* pool) {
  if ((len == 0 && !slot_desc->type().IsStringType()) || data == NULL) {
    tuple->SetNull(slot_desc->null_indicator_offset());
    return true;
  } else if (check_null_ && len == null_col_val_.size() &&
      StringCompare(data, len, null_col_val_.data(), null_col_val_.size(), len) == 0) {
    // We matched the special NULL indicator.
    tuple->SetNull(slot_desc->null_indicator_offset());
    return true;
  }

  StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
  void* slot = tuple->GetSlot(slot_desc->tuple_offset());

  // Parse the raw-text data. Translate the text string to internal format.
  const ColumnType& type = slot_desc->type();
  switch (type.type) {
    case TYPE_STRING:
    case TYPE_VARCHAR:
    case TYPE_CHAR: {
      int buffer_len = len;
      if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR) buffer_len = type.len;

      bool reuse_data = type.IsVarLenStringType() &&
          !(len != 0 && (copy_string || need_escape));

      bool base64_decode = false;
      if (type.IsBinaryType() && decode_binary_ && len != 0) {
        base64_decode = true;
        reuse_data = false;
        int64_t out_len;
        if (!Base64DecodeBufLen(data, len, &out_len)) {
          parse_result = StringParser::PARSE_FAILURE;
          break;
        }
        buffer_len = out_len;
      }

      StringValue::SimpleString str;
      str.ptr = nullptr;
      str.len = std::min(buffer_len, len);
      if (reuse_data) {
        str.ptr = const_cast<char*>(data);
      } else {
        // The codegen version of this code (generated by CodegenWriteSlot()) doesn't
        // include this path. In other words, 'reuse_data' will always be true in the
        // codegen version:
        // 1. CodegenWriteSlot() doesn't yet support slot of TYPE_CHAR
        // 2. HdfsScanner::InitializeWriteTuplesFn() will not codegen if there is
        //    any escape character.
        // 3. HdfsScanner::WriteCompleteTuple() always calls this function with
        //    'copy_string' == false.
        str.ptr = type.IsVarLenStringType() ?
            reinterpret_cast<char*>(pool->TryAllocateUnaligned(buffer_len)) :
            reinterpret_cast<char*>(slot);
        if (UNLIKELY(str.ptr == nullptr)) {
          parse_result = StringParser::PARSE_FAILURE;
          break;
        }
        if (base64_decode) {
          unsigned out_len;
          if(!Base64Decode(data, len, buffer_len, str.ptr, &out_len)) {
            parse_result = StringParser::PARSE_FAILURE;
            break;
          }
          DCHECK_LE(out_len, buffer_len);
          str.len = out_len;
        } else if (need_escape) {
          // Use a temporary variable on the stack to avoid accessing an unaligned
          // pointer.
          int str_len = str.len;
          UnescapeString(data, str.ptr, &str_len, buffer_len);
          str.len = str_len;
        } else {
          memcpy(str.ptr, data, str.len);
        }
      }
      DCHECK_NE(str.ptr, nullptr);

      if (type.type == TYPE_CHAR) {
        StringValue::PadWithSpaces(str.ptr, buffer_len, str.len);
        str.len = type.len;
      }
      // write back to the slot, if !IsVarLenStringType() we already wrote to the slot
      if (type.IsVarLenStringType()) {
        StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
        str_slot->Assign(str.ptr, str.len);
      }
      break;
    }
    case TYPE_BOOLEAN:
      *reinterpret_cast<bool*>(slot) =
        StringParser::StringToBool(data, len, &parse_result);
      break;
    case TYPE_TINYINT:
      *reinterpret_cast<int8_t*>(slot) =
        StringParser::StringToInt<int8_t>(data, len, &parse_result);
      break;
    case TYPE_SMALLINT:
      *reinterpret_cast<int16_t*>(slot) =
        StringParser::StringToInt<int16_t>(data, len, &parse_result);
      break;
    case TYPE_INT:
      *reinterpret_cast<int32_t*>(slot) =
        StringParser::StringToInt<int32_t>(data, len, &parse_result);
      break;
    case TYPE_BIGINT:
      *reinterpret_cast<int64_t*>(slot) =
        StringParser::StringToInt<int64_t>(data, len, &parse_result);
      break;
    case TYPE_FLOAT:
      *reinterpret_cast<float*>(slot) =
        StringParser::StringToFloat<float>(data, len, &parse_result);
      break;
    case TYPE_DOUBLE:
      *reinterpret_cast<double*>(slot) =
        StringParser::StringToFloat<double>(data, len, &parse_result);
      break;
    case TYPE_TIMESTAMP: {
      TimestampValue* ts_slot = reinterpret_cast<TimestampValue*>(slot);
      *ts_slot = TimestampValue::ParseSimpleDateFormat(data, len);
      if (!ts_slot->HasDate()) {
        parse_result = StringParser::PARSE_FAILURE;
      }
      break;
    }
    case TYPE_DATE: {
      *reinterpret_cast<DateValue*>(slot) =
          StringParser::StringToDate(data, len, &parse_result);
      break;
    }
    case TYPE_DECIMAL: {
      switch (slot_desc->slot_size()) {
        case 4:
          *reinterpret_cast<Decimal4Value*>(slot) =
              StringParser::StringToDecimal<int32_t>(
                  data, len, slot_desc->type(), false, &parse_result);
          break;
        case 8:
          *reinterpret_cast<Decimal8Value*>(slot) =
              StringParser::StringToDecimal<int64_t>(
                  data, len, slot_desc->type(), false, &parse_result);
          break;
        case 12:
          DCHECK(false) << "Planner should not generate this.";
          break;
        case 16:
          *reinterpret_cast<Decimal16Value*>(slot) =
              StringParser::StringToDecimal<int128_t>(
                  data, len, slot_desc->type(), false, &parse_result);
          break;
        default:
          DCHECK(false) << "Decimal slots can't be this size.";
      }
      if (parse_result != StringParser::PARSE_SUCCESS) {
        // Don't accept underflow and overflow for decimals.
        parse_result = StringParser::PARSE_FAILURE;
      }
      break;
    }
    default:
      DCHECK(false) << "bad slot type: " << slot_desc->type();
      break;
  }

  if (UNLIKELY(parse_result != StringParser::PARSE_SUCCESS)) {
    if (parse_result == StringParser::PARSE_FAILURE ||
        (strict_mode_ && parse_result == StringParser::PARSE_OVERFLOW)) {
      tuple->SetNull(slot_desc->null_indicator_offset());
      return false;
    }
  }

  return true;
}

}

#endif
